package net.srt.quartz.task;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import net.srt.api.module.data.integrate.DataAccessApi;
import net.srt.api.module.data.integrate.DataOdsApi;
import net.srt.api.module.data.integrate.constant.AccessMode;
import net.srt.api.module.data.integrate.constant.CommonRunStatus;
import net.srt.api.module.data.integrate.constant.TaskType;
import net.srt.api.module.data.integrate.dto.DataAccessDto;
import net.srt.api.module.data.integrate.dto.DataAccessTaskDto;
import net.srt.api.module.data.integrate.dto.DataOdsDto;
import net.srt.framework.common.utils.ExceptionUtils;
import net.srt.quartz.utils.CronUtils;
import org.springframework.stereotype.Component;
import srt.cloud.framework.dbswitch.common.type.ProductTypeEnum;
import srt.cloud.framework.dbswitch.core.service.IMetaDataByJdbcService;
import srt.cloud.framework.dbswitch.core.service.impl.MetaDataByJdbcServiceImpl;
import srt.cloud.framework.dbswitch.data.config.DbswichProperties;
import srt.cloud.framework.dbswitch.data.domain.DbSwitchResult;
import srt.cloud.framework.dbswitch.data.service.MigrationService;
import srt.cloud.framework.dbswitch.data.util.BytesUnitUtils;

import java.util.Date;
import java.util.concurrent.atomic.AtomicLong;

/**
 * @ClassName DataAccessTask
 * @Author zrx
 * @Date 2022/10/26 13:12
 */
@Slf4j
@Component
@RequiredArgsConstructor
public class DataAccessTask {

	private final DataAccessApi dataAccessApi;
	private final DataOdsApi dataOdsApi;

	public void syncData(String dataAccessId) {
		log.info("start run data-access");
		DataAccessDto dataAccessDto = dataAccessApi.getById(Long.parseLong(dataAccessId)).getData();
		DbswichProperties dbswichProperties = dataAccessDto.getDataAccessJson();
		DataAccessTaskDto dataAccessTaskDto = new DataAccessTaskDto();
		dataAccessTaskDto.setProjectId(dataAccessDto.getProjectId());
		dataAccessTaskDto.setDataAccessId(Long.parseLong(dataAccessId));
		dataAccessTaskDto.setRunStatus(CommonRunStatus.RUNNING.getCode());
		dataAccessTaskDto.setStartTime(new Date());
		dataAccessTaskDto.setId(dataAccessApi.addTask(dataAccessTaskDto).getData());
		MigrationService service = buildMigrationService(dataAccessDto, dbswichProperties, dataAccessTaskDto);
		DbSwitchResult dbSwitchResult;
		try {
			//执行任务
			dbSwitchResult = service.run();
		} catch (Exception e) {
			log.error("syncTask error：" + e.getMessage(), e);
			dataAccessTaskDto.setEndTime(new Date());
			dataAccessTaskDto.setRunStatus(CommonRunStatus.FAILED.getCode());
			dataAccessTaskDto.setNextRunTime(TaskType.ONE_TIME_FULL_PERIODIC_INCR_SYNC.getCode().equals(dataAccessDto.getTaskType()) ? CronUtils.getNextExecution(dataAccessDto.getCron()) : null);
			dataAccessTaskDto.setErrorInfo(ExceptionUtils.getExceptionMessage(e));
			dataAccessApi.updateTask(dataAccessTaskDto);
			return;
		}
		dataAccessTaskDto.setEndTime(new Date());
		dataAccessTaskDto.setDataCount(dbSwitchResult.getTotalRowCount().get());
		dataAccessTaskDto.setTableSuccessCount(dbSwitchResult.getTotalTableSuccessCount().get());
		dataAccessTaskDto.setTableFailCount(dbSwitchResult.getTotalTableFailCount().get());
		dataAccessTaskDto.setByteCount(BytesUnitUtils.bytesSizeToHuman(dbSwitchResult.getTotalBytes().get()));
		dataAccessTaskDto.setRunStatus(dbSwitchResult.getIfAllSuccess().get() ? CommonRunStatus.SUCCESS.getCode() : CommonRunStatus.FAILED.getCode());
		dataAccessTaskDto.setErrorInfo(dbSwitchResult.getIfAllSuccess().get() ? null : String.format("有%s张表同步失败，可以在同步结果查看！", dataAccessTaskDto.getTableFailCount()));
		dataAccessTaskDto.setNextRunTime(TaskType.ONE_TIME_FULL_PERIODIC_INCR_SYNC.getCode().equals(dataAccessDto.getTaskType()) ? CronUtils.getNextExecution(dataAccessDto.getCron()) : null);
		//更新最终的同步记录
		dataAccessApi.updateTask(dataAccessTaskDto);
		log.info("run data-access success");
	}

	/**
	 * 构建同步服务
	 *
	 * @param dataAccessDto
	 * @param dbswichProperties
	 * @param dataAccessTaskDto
	 * @return
	 */
	private MigrationService buildMigrationService(DataAccessDto dataAccessDto, DbswichProperties dbswichProperties, DataAccessTaskDto dataAccessTaskDto) {
		AtomicLong totalRowCount = new AtomicLong(0);
		AtomicLong totalBytes = new AtomicLong(0);
		AtomicLong totalTableSuccessCount = new AtomicLong(0);
		AtomicLong totalTableFailCount = new AtomicLong(0);
		//构建同步参数
		return new MigrationService(dbswichProperties, tableResult -> {
			//添加每张表的同步结果
			dataAccessApi.addTaskDetail(dataAccessTaskDto.getProjectId(), dataAccessTaskDto.getId(), dataAccessTaskDto.getDataAccessId(), tableResult);
			//如果是ods接入，添加 dataOds
			if (AccessMode.ODS.getValue().equals(dataAccessDto.getAccessMode())) {
				boolean addOds = true;
				if (!tableResult.getIfSuccess().get()) {
					//如果表没创建成功，return
					IMetaDataByJdbcService jdbcService = new MetaDataByJdbcServiceImpl(ProductTypeEnum.MYSQL);
					if (!jdbcService.tableExist(dbswichProperties.getTarget().getUrl(), dbswichProperties.getTarget().getUsername(), dbswichProperties.getTarget().getPassword(), tableResult.getTargetTableName())) {
						addOds = false;
					}
				}
				if (addOds) {
					DataOdsDto dataOdsDto = new DataOdsDto();
					dataOdsDto.setDataAccessId(dataAccessTaskDto.getDataAccessId());
					dataOdsDto.setTableName(tableResult.getTargetTableName());
					dataOdsDto.setProjectId(dataAccessDto.getProjectId());
					dataOdsDto.setRemarks(tableResult.getTableRemarks());
					dataOdsDto.setRecentlySyncTime(tableResult.getSyncTime());
					dataOdsApi.addOds(dataOdsDto);
				}
			}
			totalRowCount.getAndAdd(tableResult.getSyncCount().get());
			totalBytes.getAndAdd(tableResult.getSyncBytes().get());
			if (!tableResult.getIfSuccess().get()) {
				totalTableFailCount.getAndIncrement();
			} else {
				totalTableSuccessCount.getAndIncrement();
			}
			//同步更新总的同步结果
			dataAccessTaskDto.setDataCount(totalRowCount.get());
			dataAccessTaskDto.setTableSuccessCount(totalTableSuccessCount.get());
			dataAccessTaskDto.setTableFailCount(totalTableFailCount.get());
			dataAccessTaskDto.setByteCount(BytesUnitUtils.bytesSizeToHuman(totalBytes.get()));
			dataAccessApi.updateTask(dataAccessTaskDto);
		});
	}
}


